package com.yxsk.relay.job.admin.core.executor.quartz;

import com.alibaba.fastjson.TypeReference;
import com.yxsk.relay.job.admin.core.executor.ListenableJobExecutor;
import com.yxsk.relay.job.admin.core.executor.listener.JobExecuteListener;
import com.yxsk.relay.job.admin.core.executor.quartz.context.QuartzRemoteJobExecuteContext;
import com.yxsk.relay.job.admin.core.remote.RemoteCallerSelector;
import com.yxsk.relay.job.admin.core.router.endpoint.EndpointRouterInfo;
import com.yxsk.relay.job.admin.core.schedule.RelayJobScheduleRegistrar;
import com.yxsk.relay.job.admin.core.schedule.constant.JobDataConstant;
import com.yxsk.relay.job.admin.core.schedule.quartz.vo.JobConfig;
import com.yxsk.relay.job.admin.data.entity.JobDetails;
import com.yxsk.relay.job.admin.data.entity.JobExecuteLog;
import com.yxsk.relay.job.admin.data.service.JobDetailService;
import com.yxsk.relay.job.admin.data.service.JobExecuteLogService;
import com.yxsk.relay.job.admin.exception.job.JobExecuteRuntimeException;
import com.yxsk.relay.job.component.admin.utils.SpringBeanUtils;
import com.yxsk.relay.job.component.common.constant.NetProtocol;
import com.yxsk.relay.job.component.common.constant.UriConstant;
import com.yxsk.relay.job.component.common.exception.RelayJobException;
import com.yxsk.relay.job.component.common.protocol.caller.RemoteCaller;
import com.yxsk.relay.job.component.common.protocol.message.base.ResultResponse;
import com.yxsk.relay.job.component.common.protocol.message.execute.JobExecuteRequest;
import com.yxsk.relay.job.component.common.protocol.message.execute.JobExecuteResponse;
import com.yxsk.relay.job.component.common.utils.DateUtils;
import com.yxsk.relay.job.component.common.vo.Endpoint;
import com.yxsk.relay.job.component.common.vo.ExecuteResult;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;

import java.text.MessageFormat;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * @Author 11376
 * @CreaTime 2019/6/28 9:47
 * @Description
 */
@Slf4j
public class QuartzRemoteJobExecutor extends ListenableJobExecutor<QuartzRemoteJobExecuteContext> {

    public QuartzRemoteJobExecutor(JobExecuteListener executeListener) {
        super(executeListener);
    }

    @Override
    protected void doWork() {
        // 组装执行参数
        JobExecuteRequest request = getJobExecuteRequest();

        this.context.setRequest(request);

        if (log.isDebugEnabled()) {
            log.debug("执行远程任务请求：{}", request);
        }

        // 执行
        try {

            EndpointRouterInfo endpointRouterInfo = this.context.getEndpointRouterInfo();

            beforeRemoteCall(endpointRouterInfo, request);

            Endpoint endpoint = endpointRouterInfo.getEndpoint();
            RemoteCaller remoteCaller = this.remoteCaller(endpoint.getProtocol());
            Map<String, String> requestHeader = null;
            if (StringUtils.hasLength(endpoint.getAuthToken())) {
                requestHeader = new HashMap<>();
                requestHeader.put(UriConstant.AUTHORIZATION_HEADER_KEY, endpoint.getAuthToken());
            }

            ResultResponse<JobExecuteResponse> response = remoteCaller.call(request, requestHeader, buildExecuteUri(), new TypeReference<ResultResponse<JobExecuteResponse>>() {
            });
            this.context.setResponse(response);

            handleExecuteResponse(endpointRouterInfo, request, response);

        } catch (Exception e) {
            handleRemoteExecuteError(request, e);
        }
    }

    protected void beforeRemoteCall(EndpointRouterInfo endpointRouterInfo, JobExecuteRequest request) {
    }

    protected RemoteCaller remoteCaller(NetProtocol netProtocol) {
        return SpringBeanUtils.getBean(RemoteCallerSelector.class).select(netProtocol);
    }

    protected String buildExecuteUri() {
        EndpointRouterInfo endpointRouterInfo = this.context.getEndpointRouterInfo();
        Endpoint endpoint = endpointRouterInfo.getEndpoint();
        return this.remoteCaller(endpoint.getProtocol())
                .buildUri(endpoint.getHost(), endpoint.getPort(), UriConstant.TASK_EXECUTE_URI);
    }

    /**
     * @Author 11376
     * @Description 装配远程任务执行参数
     * @CreateTime 2019/6/15 21:07
     * @Return
     */
    protected JobExecuteRequest getJobExecuteRequest() {
        JobExecuteRequest request = new JobExecuteRequest();
        request.setJobId(this.context.getSerialNo());
        request.setHandleName(this.context.getJobConfig().getHandlerName());
        request.setParam(this.context.getTriggerInfo().getTriggerParam());
        request.setRequestTime(DateUtils.getCurrentDate());
        request.setVersion(UriConstant.VERSION_NO);
        request.setSerialNo(this.context.getSerialNo());
        request.setRetry(this.context.getRetryTimes() != null);
        return request;
    }

    /**
     * @param endpointRouterInfo
     * @param request
     * @param response
     * @Author 11376
     * @Description 处理远程任务执行结果
     * @CreateTime 2019/6/15 21:08
     * @Return
     */
    protected void handleExecuteResponse(EndpointRouterInfo endpointRouterInfo, JobExecuteRequest request, ResultResponse<JobExecuteResponse> response) {
        if (log.isInfoEnabled()) {
            log.info("远程任务执行完成, 执行主机：{}, 任务信息：{}, 执行参数：{}, 请求：{}, 执行响应：{}", endpointRouterInfo, this.context.getJobConfig(), this.context.getTriggerInfo(), request, response);
        }
        // 处理执行结果
        handleExecuteResult(this.context.getSerialNo(), response == null ? null : response.getData());
        // 执行失败时处理
        if (!ResultResponse.isOk(response)) {
            StringBuilder errorMsg = new StringBuilder("Remote job call error, response code: ").append(response.getCode()).append(", error message: ").append(response.getData() == null ? response.getMessage() : response.getData().getErrorMsg());
            handleRemoteExecuteError(request, new RelayJobException(errorMsg.toString()));
        }
    }

    /**
     *
     * @param request
     * @param e
     * @Author 11376
     * @Description 处理远程任务执行异常
     * @CreateTime 2019/6/15 21:06
     * @Return
     */
    protected void handleRemoteExecuteError(JobExecuteRequest request, Exception e) {
        log.error(MessageFormat.format("执行任务错误, 执行主机信息：{0}, 执行参数：{1}, 任务信息：{2}", this.context.getEndpointRouterInfo(), this.context.getTriggerInfo(), this.context.getJobConfig()), e);

        // 启动重试机制
        JobExecuteLog executeLog = SpringBeanUtils.getBean(JobExecuteLogService.class).findById(this.context.getSerialNo());
        if (executeLog != null) {
            // 查询任务配置
            Integer retryTimes = this.context.getJobConfig().getFailRetryTimes();
            Integer executeLogRetryTimes = executeLog.getRetryTimes();
            // 执行重试次数小于等于配置的重试次数
            if (retryTimes != null && retryTimes > 0 && executeLogRetryTimes <= retryTimes) {
                this.context.setRetryTimes(this.context.getRetryTimes() == null ? 1 : this.context.getRetryTimes() + 1);

                // 重试次数
                executeLog.setRetryTimes(this.context.getRetryTimes());
                SpringBeanUtils.getBean(JobExecuteLogService.class).updateById(executeLog);

                // 重试
                this.doWork();
            } else {
                throw new JobExecuteRuntimeException(e);
            }
        }
    }

    protected void handleChildrenJob(String result) {
        // 任务执行成功，启动后置任务
        JobDetailService detailService = SpringBeanUtils.getBean(JobDetailService.class);
        List<JobDetails> children = detailService.getChildrenJob(this.context.getTriggerInfo().getJobId());
        if (!CollectionUtils.isEmpty(children)) {
            RelayJobScheduleRegistrar scheduleRegistrar = SpringBeanUtils.getBean(RelayJobScheduleRegistrar.class);
            children.stream().forEach(jobDetail -> {
                if (JobConfig.JobStatus.NORMAL.getStatus().equals(jobDetail.getStatus())) {
                    // 传递父级触发任务id
                    Map<String, Object> paramMap = new HashMap<>(2);
                    paramMap.put(JobDataConstant.EXECUTE_LOG_CHILD_TRIGGER_ID, this.context.getSerialNo());
                    // 流水线模式、集群模式传递上一个任务执行完成的结果
                    if (JobConfig.ExecuteModel.ASSEMBLY_LINE.getModel().equals(jobDetail.getExecuteModel())) {
                        // 将执行结果传递到下一个任务
                        paramMap.put(JobDataConstant.CUSTOM_TRIGGER_PARAM_KEY, result);
                    }

                    if (!JobConfig.ExecuteModel.CLUSTER.getModel().equals(this.context.getJobConfig().getExecuteModel())) {
                        // 集群模式需等待所有任务执行完成之后出发子任务
                        scheduleRegistrar.runNow(detailService.convert(jobDetail), paramMap);
                    }
                }
            });
        }
    }

    protected void handleExecuteResult(String executeLogId, JobExecuteResponse Response) {

        if (log.isDebugEnabled()) {
            log.debug("Handle execute result, execute log id: {}, response: {}", executeLogId, Response);
        }

        JobExecuteLogService executeLogService = SpringBeanUtils.getBean(JobExecuteLogService.class);
        JobExecuteLog executeLog = executeLogService.findById(executeLogId);
        if (Response == null) {
            // 修改执行日志
            executeLog.setEndTime(DateUtils.getCurrentDate());
            executeLog.setExecuteStatus(ExecuteResult.ExecuteResultCode.ERROR.getCode());
            executeLog.setExecuteErrorMessage("执行结果为空");

        } else if (ExecuteResult.ExecuteResultCode.OK.getCode().equals(Response.getExecuteStatus())) {
            // 修改执行日志
            executeLog.setEndTime(Response.getRespTime());
            executeLog.setExecuteStatus(ExecuteResult.ExecuteResultCode.OK.getCode());
            executeLog.setExecuteResult(Response.getResult());

            // 处理执行成功逻辑
            this.handleSuccess(executeLog);
        } else {
            // 修改执行日志
            executeLog.setEndTime(DateUtils.getCurrentDate());
            executeLog.setExecuteStatus(Response.getExecuteStatus());
            executeLog.setExecuteErrorMessage(Response.getErrorMsg());
        }

        // 更新任务执行记录
        executeLogService.updateById(executeLog);
    }

    private void handleSuccess(JobExecuteLog executeLog) {
        JobConfig jobConfig = this.context.getJobConfig();

        // 启动后置任务
        String result = executeLog.getExecuteResult();
        // 如果任务是集群任务则执行集群结果处理
        if (jobConfig.getExecuteModel() == JobConfig.ExecuteModel.CLUSTER) {
            // 同步任务
            // TODO 处理集群任务执行结果
        }
        this.handleChildrenJob(result);
    }

}
